54d6e4
@@ -22,9 +22,11 @@
import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.regex.Pattern;
 import java.util.stream.StreamSupport;
@@ -171,6 +173,7 @@
public class KafkaConsumer extends DefaultConsumer {
         private final Pattern topicPattern;
         private final String threadId;
         private final Properties kafkaProps;
+        private final Map<String, Long> lastProcessedOffset = new ConcurrentHashMap<>();
 
         KafkaFetchRecords(String topicName, Pattern topicPattern, String id, Properties kafkaProps) {
             this.topicName = topicName;
@@ -294,6 +297,7 @@
public class KafkaConsumer extends DefaultConsumer {
                         long partitionLastOffset = -1;
 
                         Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator();
+                        log.debug("Records count {} received for partition {}", allRecords.records(partition).size(), partition);
                         if (!breakOnErrorHit && recordIterator.hasNext()) {
                             ConsumerRecord<Object, Object> record;
 
@@ -341,6 +345,8 @@
public class KafkaConsumer extends DefaultConsumer {
                                 } else {
                                     // record was success so remember its offset
                                     partitionLastOffset = record.offset();
+                                    //lastOffsetProcessed would be used by Consumer re-balance listener to preserve offset state upon partition revoke
+                                    lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset);
                                 }
                             }
 
@@ -418,12 +424,15 @@
public class KafkaConsumer extends DefaultConsumer {
             log.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName);
 
             StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
-            if (offsetRepository != null) {
                 for (TopicPartition partition : partitions) {
-                    long offset = consumer.position(partition);
-                    log.debug("Saving offset repository state {} from topic {} with offset: {}", threadId, topicName, offset);
-                    offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(offset));
+                String offsetKey = serializeOffsetKey(partition);
+                Long offset = lastProcessedOffset.get(offsetKey);
+                if (offset == null) {
+                    offset = -1l;
                 }
+                log.debug("Saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, offset);
+                commitOffset(offsetRepository, partition, offset, true);
+                lastProcessedOffset.remove(offsetKey);
             }
         }
 
